KAFKA-13958: Expose logdirs total and usable space via Kafka API (KIP…#12248
KAFKA-13958: Expose logdirs total and usable space via Kafka API (KIP…#12248mimaison merged 5 commits intoapache:trunkfrom
Conversation
…-827) This implements KIP-827: https://cwiki.apache.org/confluence/display/KAFKA/KIP-827%3A+Expose+logdirs+total+and+usable+space+via+Kafka+API Add TotalBytes and UsableBytes to DescribeLogDirsResponse Add matching getters on LogDirDescription
| assertEquals(mockLogMgr.liveLogDirs.size, responses.size) | ||
| responses.foreach { response => | ||
| assertEquals(Errors.NONE.code, response.errorCode) | ||
| assertTrue(response.totalBytes >= 0) |
There was a problem hiding this comment.
Is there a reason due to which totalBytes could be 0? Perhaps we want to assert a > 0 condition here?
There was a problem hiding this comment.
True this assertion is a bit too defensive! > 0 should be fine. I'll push an update.
| public class DescribeLogDirsResponse extends AbstractResponse { | ||
|
|
||
| public static final long INVALID_OFFSET_LAG = -1L; | ||
| public static final long UNKNOWN_VOLUME_BYTES = -1L; |
There was a problem hiding this comment.
Do we still want this?
Correct me if I am wrong here but I thought that we reached a conclusion in the KIP that Optional will cover the scenario when client is using a newer API and broker is old.
There was a problem hiding this comment.
The wire protocol does not have nullable integers. So the new fields are defined to use -1 as the default value in the protocol:
This constant is used to map the value in DescribeLogDirsResponse to an Optional:
| private final OptionalLong usableBytes; | ||
|
|
||
| public LogDirDescription(ApiException error, Map<TopicPartition, ReplicaInfo> replicaInfos) { | ||
| this(error, replicaInfos, UNKNOWN_VOLUME_BYTES, UNKNOWN_VOLUME_BYTES); |
There was a problem hiding this comment.
Instead of UNKNOWN_VOLUME_BYTES, this could be null or Optional.Empty()? That way we could get rid of UNKNOWN_VOLUME_BYTES completely.
| } | ||
| } | ||
| result.put(logDirResult.logDir(), new LogDirDescription(Errors.forCode(logDirResult.errorCode()).exception(), replicaInfoMap)); | ||
| result.put(logDirResult.logDir(), new LogDirDescription(Errors.forCode(logDirResult.errorCode()).exception(), replicaInfoMap, logDirResult.totalBytes(), logDirResult.usableBytes())); |
clients/src/main/java/org/apache/kafka/clients/admin/LogDirDescription.java
Show resolved
Hide resolved
| return totalBytes; | ||
| } | ||
|
|
||
| public OptionalLong usableBytes() { |
There was a problem hiding this comment.
Also do we want to say something about the contraints on usableBytes and totalBytes (individually and compared with each other).
There was a problem hiding this comment.
I've added some details, let me know if it's not what you had in mind
|
Thank you for making the suggested changes. The current set of changes don't contain a fully end to end integration test (KafkaAdminClientTest uses response mocks). Please make a change in |
|
Thanks @divijvaidya for the review. I've updated the end to end tests to cover this new feature. |
viktorsomogyi
left a comment
There was a problem hiding this comment.
@mimaison thanks for this feature, it's a really useful addition for us as well.
Tried out your code locally, ran the tests, they all work. I don't have any particular comments, it looks good to me.
…-827)
This implements KIP-827: https://cwiki.apache.org/confluence/display/KAFKA/KIP-827%3A+Expose+logdirs+total+and+usable+space+via+Kafka+API
Add TotalBytes and UsableBytes to DescribeLogDirsResponse
Add matching getters on LogDirDescription
Committer Checklist (excluded from commit message)